Skip to content

[client] introduce client-side event aggregation#6394

Open
dmitri-netbird wants to merge 35 commits into
netbirdio:mainfrom
dmitri-netbird:dmitri-event-aggregation
Open

[client] introduce client-side event aggregation#6394
dmitri-netbird wants to merge 35 commits into
netbirdio:mainfrom
dmitri-netbird:dmitri-event-aggregation

Conversation

@dmitri-netbird

@dmitri-netbird dmitri-netbird commented Jun 10, 2026

Copy link
Copy Markdown

Describe your changes

To reduce the frequency at which flow events are emitted, a client-aggregation of flow events is implemented in this PR.

Events are aggregated over a time window by destination address, destination port, protocol, and icmp code (for icmp). Total number of start-, end-, and drop-events is tracked, as well as the number of transmitted and received bytes and packets.

Issue ticket number and link

Stack

Checklist

  • Is it a bug fix
  • Is a typo/documentation fix
  • Is a feature enhancement
  • It is a refactor
  • Created tests that fail without the change (if possible)
  • This change does not modify the public API, gRPC protocols, functionality behavior, CLI / service flags, or introduce a new feature — OR I have discussed it with the NetBird team beforehand (link the issue / Slack thread in the description). See CONTRIBUTING.md.

By submitting this pull request, you confirm that you have read and agree to the terms of the Contributor License Agreement.

Documentation

Select exactly one:

  • I added/updated documentation for this change
  • Documentation is not needed for this change (explain why)

Docs PR URL (required if "docs added" is checked)

Paste the PR link from https://github.com/netbirdio/docs here:

netbirdio/docs#800

Summary by CodeRabbit

  • New Features
    • Network flow events now include an aggregation window with start/end timestamps.
    • Events now report per-flow counters for starts, ends, and drops.
    • Flow event delivery is more reliable: events are aggregated and unacknowledged events are retried automatically until ACKed.
  • Documentation
    • Updated API schema and generated models to include the new aggregation window fields and start/end/drop counters.

Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
…gregation window

Signed-off-by: Dmitri <dmitri.external@netbird.io>
@dmitri-netbird dmitri-netbird self-assigned this Jun 10, 2026
@coderabbitai

coderabbitai Bot commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ffde2121-d1ae-4926-a5f7-93ed7518d8b1

📥 Commits

Reviewing files that changed from the base of the PR and between 1f1413e and 05308fb.

📒 Files selected for processing (1)
  • client/internal/netflow/store/event_aggregation_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • client/internal/netflow/store/event_aggregation_test.go

📝 Walkthrough

Walkthrough

Adds time-windowed event aggregation and ACK-based retry logic to the netflow client. Events are grouped into aggregation windows, sent with start/end/drop counters and window timestamps, tracked in an un-acked store, and retried with exponential backoff until acknowledged. Protobuf, OpenAPI schema, and generated Go types are extended accordingly.

Changes

Netflow ACK-based Event Retrying and Aggregation

Layer / File(s) Summary
Type system, aggregation contracts, and protobuf schema
client/internal/netflow/types/types.go, flow/proto/flow.proto
Introduces FlowEventAggregator and AggregatingStore interfaces, adds Event.Clone(), WindowStart/WindowEnd fields on Event, NumOfStarts/NumOfEnds/NumOfDrops on EventFields, updates FlowLogger to expose ResetAggregationWindow, and extends the protobuf FlowEvent and FlowFields messages with matching fields.
AggregatingMemory store implementation and tests
client/internal/netflow/store/memory.go, client/internal/netflow/store/event_aggregation_test.go
Implements AggregatingMemory embedding Memory with ResetAggregationWindow (atomic map swap plus window bounds) and GetAggregatedEvents (aggregation by src/dst IP, dst port, direction, protocol, ICMP type; byte/packet sums; start/end/drop counters; pseudorandom unique key for non-TCP/UDP/ICMP protocols). Unit tests cover all supported protocols, ICMP type variations, and unknown protocols.
Logger aggregation integration
client/internal/netflow/logger/logger.go
Updates Logger.Store field type to AggregatingStore, switches constructor to NewAggregatingMemoryStore(), and adds ResetAggregationWindow() delegation method.
Manager retry machinery and un-acked event tracking
client/internal/netflow/manager.go
Adds eventsWithoutAcks store and retryInterval to Manager, initializes both in NewManager, reworks startSender to reset the aggregation window and unconditionally enqueue sent events into eventsWithoutAcks, updates receiveACKs to delete acked IDs from eventsWithoutAcks, spawns a new startRetries goroutine with a 1-second ticker, 1-second grace period, and exponential backoff on failure, and expands toProtoEvent to include window bounds and counters.
Manager integration tests
client/internal/netflow/manager_integration_test.go
Adds a gRPC in-process testServer with a bidirectional streaming Events handler. TestSendEventReceiveAck verifies un-acked tracking before ACK delivery; TestRetryEvents verifies that a short retry interval triggers retries and that acking clears the un-acked store.
OpenAPI schema and generated types update
shared/management/http/api/openapi.yml, shared/management/http/api/types.gen.go
Extends NetworkTrafficEvent in the OpenAPI spec with num_of_starts, num_of_ends, num_of_drops, window_start, and window_end fields (added to required), reflected in the regenerated Go struct.

Sequence Diagram(s)

sequenceDiagram
  rect rgba(173, 216, 230, 0.5)
    note over Manager,eventsWithoutAcks: Sender loop (per aggregation window tick)
  end
  participant Manager
  participant AggregatingMemory
  participant eventsWithoutAcks
  participant FlowServer

  Manager->>AggregatingMemory: ResetAggregationWindow()
  AggregatingMemory-->>Manager: window snapshot + aggregated events

  loop each aggregated event
    Manager->>FlowServer: send FlowEvent (WindowStart, WindowEnd, NumOfStarts/Ends/Drops)
    Manager->>eventsWithoutAcks: StoreEvent (regardless of send result)
  end

  FlowServer-->>Manager: FlowEventAck(event IDs)
  Manager->>eventsWithoutAcks: DeleteEvents(acked IDs)

  rect rgba(255, 200, 150, 0.5)
    note over Manager,FlowServer: startRetries goroutine (1s ticker)
  end
  loop every retryInterval
    Manager->>eventsWithoutAcks: GetEvents()
    eventsWithoutAcks-->>Manager: unacked events older than 1s grace
    Manager->>FlowServer: retry send FlowEvent
    alt send fails
      Manager->>Manager: exponential backoff (NextBackOff)
    else send succeeds
      Manager->>Manager: reset backoff, re-arm 1s timer
    end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested reviewers

  • lixmal
  • crn4

Poem

🐇 Hop, hop, the packets fly,
No event lost beneath the sky!
We track each flow with windows bright,
Retry with backoff through the night.
Start, end, drop — we count them all,
ACK received? Clear the hall! 🎉

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 11.76% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title '[client] introduce client-side event aggregation' directly and clearly describes the main feature being implemented—client-side aggregation of flow events.
Description check ✅ Passed The PR description covers the key aspects: what is being changed (client-aggregation of flow events), how events are aggregated (by destination address, port, protocol, ICMP code), and what metrics are tracked. However, it lacks an Issue ticket number and the Docs PR URL appears incomplete (shows '/800' but is formatted correctly). The checklist is mostly completed with the feature enhancement box checked and documentation marked as added.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@CLAassistant

CLAassistant commented Jun 10, 2026

Copy link
Copy Markdown

CLA assistant check
All committers have signed the CLA.

@codecov

codecov Bot commented Jun 10, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 64.38356% with 52 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (main@6c26178). Learn more about missing BASE report.

Files with missing lines Patch % Lines
flow/proto/flow.pb.go 0.00% 20 Missing ⚠️
client/internal/netflow/store/memory.go 80.39% 9 Missing and 1 partial ⚠️
flow/proto/flow_grpc.pb.go 0.00% 10 Missing ⚠️
client/internal/netflow/types/types.go 0.00% 6 Missing ⚠️
client/internal/netflow/manager.go 92.85% 2 Missing and 2 partials ⚠️
client/internal/netflow/logger/logger.go 33.33% 2 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main    #6394   +/-   ##
=======================================
  Coverage        ?   30.66%           
=======================================
  Files           ?      900           
  Lines           ?   113342           
  Branches        ?        0           
=======================================
  Hits            ?    34756           
  Misses          ?    73988           
  Partials        ?     4598           
Flag Coverage Δ
client 29.68% <64.38%> (?)
management 29.54% <ø> (?)
proxy 42.53% <ø> (?)
relay 36.00% <ø> (?)
unit 30.66% <64.38%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
client/internal/netflow/manager.go (1)

298-318: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Aggregated event counters are not populated in the protobuf message.

The aggregated NumOfStarts, NumOfEnds, and NumOfDrops fields computed by GetAggregatedEvents() (memory.go lines 96-122) are not set in the FlowFields protobuf message. The server will receive aggregated events with zeroed counters, losing the aggregation data.

🐛 Proposed fix
 	protoEvent := &proto.FlowEvent{
 		EventId:   event.ID[:],
 		Timestamp: timestamppb.New(event.Timestamp),
 		PublicKey: publicKey,
 		FlowFields: &proto.FlowFields{
 			FlowId:           event.FlowID[:],
 			RuleId:           event.RuleID,
 			Type:             proto.Type(event.Type),
 			Direction:        proto.Direction(event.Direction),
 			Protocol:         uint32(event.Protocol),
 			SourceIp:         event.SourceIP.AsSlice(),
 			DestIp:           event.DestIP.AsSlice(),
 			RxPackets:        event.RxPackets,
 			TxPackets:        event.TxPackets,
 			RxBytes:          event.RxBytes,
 			TxBytes:          event.TxBytes,
 			SourceResourceId: event.SourceResourceID,
 			DestResourceId:   event.DestResourceID,
+			NumOfStarts:      event.NumOfStarts,
+			NumOfEnds:        event.NumOfEnds,
+			NumOfDrops:       event.NumOfDrops,
 		},
 	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@client/internal/netflow/manager.go` around lines 298 - 318, The FlowFields
protobuf is missing the aggregated counters, so populate FlowFields.NumOfStarts,
FlowFields.NumOfEnds, and FlowFields.NumOfDrops from the nftypes.Event before
returning the proto.FlowEvent: in toProtoEvent(publicKey []byte, event
*nftypes.Event) add the three fields (NumOfStarts: event.NumOfStarts, NumOfEnds:
event.NumOfEnds, NumOfDrops: event.NumOfDrops) inside the proto.FlowFields
literal so aggregated counts computed by GetAggregatedEvents() are sent to the
server.
🧹 Nitpick comments (3)
client/internal/netflow/store/memory.go (2)

73-79: ⚡ Quick win

Rename icmpType field to icmpCode for clarity.

The aggregationKey.icmpType field is populated with v.ICMPCode (line 84), not v.ICMPType. Renaming the field to icmpCode will align the struct definition with its usage and improve code clarity.

♻️ Proposed fix
 type aggregationKey struct {
 	destAddr netip.Addr
 	destPort uint16
 	protocol uint8
-	icmpType uint8
+	icmpCode uint8
 	unique   int64 // used to prevent aggregation on non icmp/udp/tcp events
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@client/internal/netflow/store/memory.go` around lines 73 - 79, The struct
field aggregationKey.icmpType is misnamed because callers populate it with
v.ICMPCode; rename the field to icmpCode to match usage and improve clarity.
Update the aggregationKey definition (replace icmpType with icmpCode) and any
references to aggregationKey.icmpType throughout the codebase (e.g., places that
assign v.ICMPCode) so they compile and keep semantics unchanged.

123-127: 💤 Low value

Consider documenting the timestamp selection logic.

The condition aggregatedEvent.Timestamp.Compare(v.Timestamp) > 0 keeps the earliest event's timestamp, ID, and Type when aggregating multiple events. This is intentional (confirmed by tests), but the logic could benefit from a comment explaining that the earliest event's metadata is preserved to represent the aggregation window's start.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@client/internal/netflow/store/memory.go` around lines 123 - 127, Add a short
explanatory comment above the timestamp selection block explaining that the
condition aggregatedEvent.Timestamp.Compare(v.Timestamp) > 0 intentionally
preserves the earliest event's timestamp, ID and Type when merging events so the
aggregated record represents the start of the aggregation window; reference the
variables aggregatedEvent, v, Timestamp.Compare, ID and Type in the comment so
future readers understand the preserved metadata semantic.
client/internal/netflow/store/tcp_aggregation_test.go (1)

21-279: ⚡ Quick win

Add ICMP test coverage for aggregation logic.

The test suite only covers TCP protocol aggregation. Since the PR objectives mention ICMP aggregation by code, and GetAggregatedEvents has ICMP-specific logic (line 84 in memory.go uses ICMPCode as part of the aggregation key), adding at least one ICMP test case would verify that ICMP flows are aggregated correctly.

Consider adding a test case that verifies ICMP aggregation:

{
    description: "ICMP echo requests",
    events: []*types.Event{
        {
            EventFields: types.EventFields{
                Type:     types.TypeStart,
                Protocol: types.ICMP,
                DestIP:   ipAddr("2.2.2.2"),
                ICMPType: 8, // Echo Request
                ICMPCode: 0,
                RxBytes:  100,
            }},
        {
            EventFields: types.EventFields{
                Type:     types.TypeEnd,
                Protocol: types.ICMP,
                DestIP:   ipAddr("2.2.2.2"),
                ICMPType: 8,
                ICMPCode: 0,
                RxBytes:  200,
            }},
    },
    expected: []*types.Event{
        {
            EventFields: types.EventFields{
                Type:        types.TypeStart,
                Protocol:    types.ICMP,
                DestIP:      ipAddr("2.2.2.2"),
                ICMPType:    8,
                ICMPCode:    0,
                RxBytes:     300,
                NumOfStarts: 1,
                NumOfEnds:   1,
            }},
    },
},
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@client/internal/netflow/store/tcp_aggregation_test.go` around lines 21 - 279,
Add an ICMP test case to TestTcpAggregation to cover the ICMP-specific
aggregation key (ICMPCode) used by GetAggregatedEvents: create events with
Protocol types.ICMP and matching ICMPType/ICMPCode (e.g., echo request
ICMPType=8, ICMPCode=0) with start and end events that have RxBytes/TXBytes to
be summed, store them via NewAggregatingMemoryStore().StoreEvent, then include
an expected aggregated Event that verifies RxBytes/TXBytes are summed and
NumOfStarts/NumOfEnds reflect the inputs; reference TestTcpAggregation,
GetAggregatedEvents, NewAggregatingMemoryStore, and the ICMPCode/ICMPType fields
when adding the case.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@client/internal/netflow/store/memory.go`:
- Around line 81-132: In GetAggregatedEvents, the aggregation lookup key is
built with the ICMP code (aggregationKey.icmpType = v.ICMPCode) which groups by
the wrong field; change the lookupKey construction in GetAggregatedEvents to use
the ICMP Type field from the event (e.g., v.ICMPType) so aggregation groups by
type not code, and ensure aggregationKey.icmpType's expected type matches the
event field; update any related comments if present.

In `@flow/proto/generate.sh`:
- Around line 15-16: Uncomment and restore the pinned go install lines in
generate.sh for the protobuf generators so regeneration is reproducible:
re-enable the go install invocations for protoc-gen-go and protoc-gen-go-grpc
(the previously commented lines) and update their version tags to match the
repository's checked-in generators (protoc-gen-go v1.36.11 and
protoc-gen-go-grpc v1.6.1); leave any protoc binary handling unchanged but
ensure the script documents the required protoc version (protoc v7.34.1) if it
cannot be installed via go.

---

Outside diff comments:
In `@client/internal/netflow/manager.go`:
- Around line 298-318: The FlowFields protobuf is missing the aggregated
counters, so populate FlowFields.NumOfStarts, FlowFields.NumOfEnds, and
FlowFields.NumOfDrops from the nftypes.Event before returning the
proto.FlowEvent: in toProtoEvent(publicKey []byte, event *nftypes.Event) add the
three fields (NumOfStarts: event.NumOfStarts, NumOfEnds: event.NumOfEnds,
NumOfDrops: event.NumOfDrops) inside the proto.FlowFields literal so aggregated
counts computed by GetAggregatedEvents() are sent to the server.

---

Nitpick comments:
In `@client/internal/netflow/store/memory.go`:
- Around line 73-79: The struct field aggregationKey.icmpType is misnamed
because callers populate it with v.ICMPCode; rename the field to icmpCode to
match usage and improve clarity. Update the aggregationKey definition (replace
icmpType with icmpCode) and any references to aggregationKey.icmpType throughout
the codebase (e.g., places that assign v.ICMPCode) so they compile and keep
semantics unchanged.
- Around line 123-127: Add a short explanatory comment above the timestamp
selection block explaining that the condition
aggregatedEvent.Timestamp.Compare(v.Timestamp) > 0 intentionally preserves the
earliest event's timestamp, ID and Type when merging events so the aggregated
record represents the start of the aggregation window; reference the variables
aggregatedEvent, v, Timestamp.Compare, ID and Type in the comment so future
readers understand the preserved metadata semantic.

In `@client/internal/netflow/store/tcp_aggregation_test.go`:
- Around line 21-279: Add an ICMP test case to TestTcpAggregation to cover the
ICMP-specific aggregation key (ICMPCode) used by GetAggregatedEvents: create
events with Protocol types.ICMP and matching ICMPType/ICMPCode (e.g., echo
request ICMPType=8, ICMPCode=0) with start and end events that have
RxBytes/TXBytes to be summed, store them via
NewAggregatingMemoryStore().StoreEvent, then include an expected aggregated
Event that verifies RxBytes/TXBytes are summed and NumOfStarts/NumOfEnds reflect
the inputs; reference TestTcpAggregation, GetAggregatedEvents,
NewAggregatingMemoryStore, and the ICMPCode/ICMPType fields when adding the
case.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4e155db1-9e65-4af4-8192-353348a2ac12

📥 Commits

Reviewing files that changed from the base of the PR and between d770376 and 598558c.

⛔ Files ignored due to path filters (2)
  • flow/proto/flow.pb.go is excluded by !**/*.pb.go
  • flow/proto/flow_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (8)
  • client/internal/netflow/logger/logger.go
  • client/internal/netflow/manager.go
  • client/internal/netflow/manager_integration_test.go
  • client/internal/netflow/store/memory.go
  • client/internal/netflow/store/tcp_aggregation_test.go
  • client/internal/netflow/types/types.go
  • flow/proto/flow.proto
  • flow/proto/generate.sh

Comment thread client/internal/netflow/store/memory.go
Comment thread flow/proto/generate.sh Outdated
…mpv6

Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
@dmitri-netbird dmitri-netbird changed the title introduce client-side event aggregation [client] introduce client-side event aggregation Jun 15, 2026
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@client/internal/netflow/store/event_aggregation_test.go`:
- Around line 45-47: The generateEvents function is being called inside a loop
with randomized DestPort values that can collide, causing test nondeterminism
when multiple event groups accidentally get the same port and merge. Modify the
generateEvents calls to use deterministic port values derived from the loop
variable i (such as basePort + i or similar) instead of relying on
randomization. This applies at both the loop starting at line 45-47 and the
similar aggregation test loop at lines 110-118 to ensure each generated event
group has a unique, collision-free port identifier.

In `@shared/management/http/api/openapi.yml`:
- Around line 2768-2779: The OpenAPI schema exposes three new counter fields
(num_of_starts, num_of_ends, num_of_drops) but the toProtoEvent function does
not map these counters to the corresponding proto fields
(FlowFields.NumOfStarts, FlowFields.NumOfEnds, FlowFields.NumOfDrops). Update
the toProtoEvent function to set these three FlowFields proto properties from
the aggregated counter values so the new API fields are properly populated
end-to-end.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: baa642fa-3003-452a-a2ad-2b4283e82c15

📥 Commits

Reviewing files that changed from the base of the PR and between 598558c and 07c527f.

⛔ Files ignored due to path filters (2)
  • flow/proto/flow.pb.go is excluded by !**/*.pb.go
  • flow/proto/flow_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (4)
  • client/internal/netflow/store/event_aggregation_test.go
  • client/internal/netflow/store/memory.go
  • shared/management/http/api/openapi.yml
  • shared/management/http/api/types.gen.go
✅ Files skipped from review due to trivial changes (1)
  • shared/management/http/api/types.gen.go

Comment thread client/internal/netflow/store/event_aggregation_test.go Outdated
Comment thread shared/management/http/api/openapi.yml
Comment thread client/internal/netflow/manager.go Outdated

@riccardomanfrin riccardomanfrin left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared to what was there from before the aggregation key does not consider direction and source IP and port (credits to @lixmal ).

I think the source port as being ephemeral is ok to drop, but dir and source IP are probably to keep.

I think there might be one more little problem in general: all the fields that are part of the Event, originate as information from the traffic (e.g. NOT ID / Timestamp), but are not part of the Aggregation Key should ideally be removed from the sent event - which I suppose is not possible for backward compat-. If not possible they should at least be cleared before sending the event. Otherwise you are populating them with the first collected event of the time window which is not statistically meaningful.

Comment thread client/internal/netflow/store/memory.go Outdated
Comment thread client/internal/netflow/store/memory.go
Comment thread client/internal/netflow/manager.go Outdated
@dmitri-netbird

dmitri-netbird commented Jun 16, 2026

Copy link
Copy Markdown
Author

Compared to what was there from before the aggregation key does not consider direction and source IP and port (credits to @lixmal ).

Nods, fixed this.

I think there might be one more little problem in general: all the fields that are part of the Event, originate as information from the traffic (e.g. NOT ID / Timestamp), but are not part of the Aggregation Key should ideally be removed from the sent event - which I suppose is not possible for backward compat-. If not possible they should at least be cleared before sending the event. Otherwise you are populating them with the first collected event of the time window which is not statistically meaningful.

Nods, can't drop the fields, as it would be a breaking change. The fields that we are not explicitly aggregating on:

  • flowId -- I think it's used in quite a few places, but I think it's ok to keep it w/e value we end up with; an alternative is to create a new uuid, but I don't see much of a difference (I don't think we can end up with the same flowId for multiple distinct aggregated flows)
  • ruleId -- if I'm reading the code correctly it should be the same for a given dstAddr/port combination? Not sure if this is used for icmp traffic?
  • SourceResourceID -- I think it's unique for a given src IP/port tuple, not sure what should be used instead? Possibly ok to reuse one of the resouceIds used in the aggregation?
  • DestResourceID -- should be the same for a given aggregated flow
  • icmp code -- not sure what to do with this, could reset it to zero?
  • the rest of the fields we aggregate on

Got clarification re: SourceResourceID -- for a given src ip, dst ip, dst port, and direction, resouce id should stay the same.

Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
@dmitri-netbird dmitri-netbird force-pushed the dmitri-event-aggregation branch from 450a879 to 1721a4f Compare June 16, 2026 15:08
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
@dmitri-netbird dmitri-netbird marked this pull request as ready for review June 17, 2026 08:18
@dmitri-netbird

Copy link
Copy Markdown
Author

proto version check is failing b/c flow grpc bindings (flow/proto/flow_grpc.pb.go) do not have protoc metadata header in main/HEAD.

Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
client/internal/netflow/manager.go (1)

54-60: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Clear un-acked events when flow is disabled or the receiver changes.

eventsWithoutAcks survives disableFlow and receiver resets. After disabling flow or changing URL/token, stale flow metadata can be retried to the next receiver; clear or reinitialize this store at those lifecycle boundaries.

Proposed fix
 	if m.receiverClient != nil {
 		if err := m.receiverClient.Close(); err != nil {
 			log.Warnf("error closing previous flow client: %v", err)
 		}
+		m.eventsWithoutAcks.Close()
 	}
@@
 	m.logger.Close()
+	m.eventsWithoutAcks.Close()
 
 	if m.receiverClient == nil {
 		return nil
 	}

Also applies to: 94-100, 143-150

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@client/internal/netflow/manager.go` around lines 54 - 60, The
eventsWithoutAcks store is not being cleared when the flow is disabled or the
receiver changes, which can cause stale event metadata to be retried to a new
receiver. Add logic to clear or reinitialize the eventsWithoutAcks store in the
disableFlow method (referenced at lines 94-100) and in the receiver change/reset
logic (referenced at lines 143-150) to ensure that pending events are discarded
when these lifecycle boundaries are crossed.
🧹 Nitpick comments (1)
client/internal/netflow/store/event_aggregation_test.go (1)

57-58: ⚡ Quick win

Add boundary coverage for source ports and ICMP codes.

The tests vary source IP and ICMP type, but never create same-source/same-destination TCP or UDP events with different source ports, and never set ICMPCode. Add those cases so aggregation-key regressions cannot collapse fields that are still serialized downstream.

Also applies to: 109-110, 129-165

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@client/internal/netflow/store/event_aggregation_test.go` around lines 57 -
58, Expand the test cases in the event aggregation test to include boundary
coverage for source ports and ICMP codes. Specifically, add test scenarios where
same-source and same-destination TCP or UDP events have different source ports
(by modifying the generateEvents calls) and add test cases where ICMPCode is set
on ICMP events. This ensures that aggregation keys properly distinguish events
based on these fields and prevents regressions where fields still used
downstream could be incorrectly collapsed during aggregation. Apply these
additions across all relevant test cases including the ones around lines 57-58,
109-110, and 129-165.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@client/internal/netflow/manager.go`:
- Line 254: The timer initialization at line 254 respects the configurable
m.retryInterval for the first attempt, but line 283 hard-codes a one-second
duration when resetting the timer in the retry loop. Replace the hard-coded
one-second interval at line 283 with m.retryInterval to ensure the configurable
retry interval is applied consistently across all retry attempts, not just the
initial one.
- Around line 251-253: The eventsWithoutAcks store in the Manager.startRetries
method is unbounded and grows indefinitely when ACKs stop arriving, consuming
unbounded client memory during receiver outages. Add a maximum size limit and
age-based eviction policy to the eventsWithoutAcks store to prevent unlimited
growth, implementing logic to drop the oldest events when the threshold is
reached. Apply this bounded store policy consistently across all event storage
and retry logic throughout the Manager implementation.

In `@client/internal/netflow/store/memory.go`:
- Around line 77-84: The aggregationKey struct is missing the sourcePort field,
which causes TCP/UDP events from the same source IP but different source ports
to be incorrectly aggregated together. Add a sourcePort field of type uint16 to
the aggregationKey struct (similar to the existing destPort field), then ensure
this field is properly populated when creating aggregation keys in the grouping
logic around line 93 and when building keys around lines 140-144, so that events
are only aggregated when they match on source IP, source port, destination
IP/port, direction, and protocol.
- Line 84: The unsupported-protocol handling branch returns before assigning
WindowStart, WindowEnd, and single-event counter values, causing toProtoEvent to
serialize zero values for these metadata fields. Additionally, the unique field
assignment uses time.Now().UnixNano() for uniqueness instead of the event ID.
Ensure that all unsupported/pass-through protocol events have proper
WindowStart, WindowEnd, and counter assignments before returning, and replace
the time.Now().UnixNano() call with the actual event ID when setting the unique
field to properly track event identity regardless of protocol type.
- Around line 69-72: Capture the current time once before creating the
AggregatingMemory object in the window rotation logic. Instead of calling
time.Now() twice (once for WindowEnd when creating the toret AggregatingMemory
and once for WindowStart when resetting am.WindowStart), assign time.Now() to a
variable like now, then use this same now variable for both the WindowEnd field
in the AggregatingMemory initialization and the am.WindowStart assignment. This
ensures consistent timestamp boundaries and eliminates any time gap between the
closing and opening of aggregation windows.
- Around line 112-118: The ICMPCode field is not included in the aggregation key
for ICMP events, but it is still serialized by toProtoEvent, which leaves an
arbitrary code value in the aggregated output. In the code section where
event.Type is set to types.TypeUnknown (around line 112), explicitly reset the
ICMPCode field to its zero value (0 or nil depending on the field type) to
ensure consistency with the type-level aggregation. This should be done
alongside the existing assignments to event.WindowStart and event.WindowEnd to
maintain data integrity.

---

Outside diff comments:
In `@client/internal/netflow/manager.go`:
- Around line 54-60: The eventsWithoutAcks store is not being cleared when the
flow is disabled or the receiver changes, which can cause stale event metadata
to be retried to a new receiver. Add logic to clear or reinitialize the
eventsWithoutAcks store in the disableFlow method (referenced at lines 94-100)
and in the receiver change/reset logic (referenced at lines 143-150) to ensure
that pending events are discarded when these lifecycle boundaries are crossed.

---

Nitpick comments:
In `@client/internal/netflow/store/event_aggregation_test.go`:
- Around line 57-58: Expand the test cases in the event aggregation test to
include boundary coverage for source ports and ICMP codes. Specifically, add
test scenarios where same-source and same-destination TCP or UDP events have
different source ports (by modifying the generateEvents calls) and add test
cases where ICMPCode is set on ICMP events. This ensures that aggregation keys
properly distinguish events based on these fields and prevents regressions where
fields still used downstream could be incorrectly collapsed during aggregation.
Apply these additions across all relevant test cases including the ones around
lines 57-58, 109-110, and 129-165.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 22857878-55b9-4398-8983-8a27c9b79e09

📥 Commits

Reviewing files that changed from the base of the PR and between 598558c and 17cc13f.

⛔ Files ignored due to path filters (2)
  • flow/proto/flow.pb.go is excluded by !**/*.pb.go
  • flow/proto/flow_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (8)
  • client/internal/netflow/manager.go
  • client/internal/netflow/store/event_aggregation_test.go
  • client/internal/netflow/store/memory.go
  • client/internal/netflow/types/types.go
  • flow/proto/flow.proto
  • flow/proto/generate.sh
  • shared/management/http/api/openapi.yml
  • shared/management/http/api/types.gen.go
✅ Files skipped from review due to trivial changes (1)
  • flow/proto/generate.sh
🚧 Files skipped from review as they are similar to previous changes (1)
  • client/internal/netflow/types/types.go

Comment thread client/internal/netflow/manager.go Outdated
Comment thread client/internal/netflow/manager.go
Comment thread client/internal/netflow/store/memory.go Outdated
Comment thread client/internal/netflow/store/memory.go Outdated
Comment thread client/internal/netflow/store/memory.go Outdated
Comment thread client/internal/netflow/store/memory.go
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Comment thread client/internal/netflow/manager.go Outdated
InitialInterval: 1 * time.Second,
RandomizationFactor: 0.5,
Multiplier: 1.7,
MaxInterval: m.flowConfig.Interval / 2,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.Interval should be accesed under a lock

@dmitri-netbird dmitri-netbird Jun 22, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an interesting one, actually; golang race detector doesn't report it, as it would require a rapid succession of FlowConfig updates, which we don't test. Probably not a common case, but I fixed all accesses to FlowConfig.Interval (I don't see anything else being accessed from that struct outside of the
lock) by passing a copy of the value to go routines...

Comment thread flow/proto/generate.sh Outdated
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@client/internal/netflow/store/event_aggregation_test.go`:
- Line 157: The subtest name in the t.Run call is using
string(types.ProtocolUnknown) which produces a NUL-character string instead of a
readable label. Replace string(types.ProtocolUnknown) in the subtest name
concatenation with a proper readable string representation of the unknown
protocol type, such as a string constant or by calling an appropriate String()
method if available on the types.ProtocolUnknown value.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d74f843b-4ccc-4803-9ff1-3f2fada3201f

📥 Commits

Reviewing files that changed from the base of the PR and between 17cc13f and 41a15f6.

📒 Files selected for processing (2)
  • client/internal/netflow/store/event_aggregation_test.go
  • client/internal/netflow/store/memory.go

Comment thread client/internal/netflow/store/event_aggregation_test.go Outdated
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants